package com.ai.common.gpt.subscriber;

import com.ai.common.gpt.openAiRes.text.OpenAiResponse;
import com.alibaba.fastjson.JSON;
import lombok.extern.slf4j.Slf4j;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import reactor.core.Disposable;
import reactor.core.publisher.FluxSink;

@Slf4j
public class OpenAISubscriber implements Subscriber<String>, Disposable {
    private final FluxSink<String> emitter;
    private Subscription subscription;
    private final StringBuilder sb;

    public OpenAISubscriber(FluxSink<String> emitter) {
        this.emitter = emitter;
        this.sb = new StringBuilder();
    }

    @Override
    public void onSubscribe(Subscription subscription) {
        this.subscription = subscription;
        subscription.request(1);
    }

    @Override
    public void onNext(String data) {
        //log.info("OpenAI返回数据：{}", data);
        if ("[DONE]".equals(data)) {
            log.info("OpenAI返回数据结束了");
            subscription.request(1);
            emitter.next(JSON.toJSONString(new OpenAIMessageResponse("", true)));
            emitter.complete();
        } else {
            OpenAiResponse openAiResponse = JSON.parseObject(data, OpenAiResponse.class);
            String content = openAiResponse.getChoices().get(0).getDelta().getContent();
            content = content == null ? "" : content;
            emitter.next(JSON.toJSONString(new OpenAIMessageResponse(content, null)));
            sb.append(content);
            subscription.request(1);
        }
    }

    @Override
    public void onError(Throwable t) {
        log.error("OpenAI返回数据异常：{}", t.getMessage());
        emitter.error(t);
//        completedCallBack.fail(sessionId);
    }

    @Override
    public void onComplete() {
        log.info("OpenAI返回数据完成");
//        if (messageType == MessageType.IMAGE) {
//            OpenAiImageResponse aiImageResponse = JSON.parseObject(sb.toString(), OpenAiImageResponse.class);
//            String url = aiImageResponse.getData().stream().map(DataRes::getUrl).collect(Collectors.joining(","));
//            emitter.next(JSON.toJSONString(new MessageRes(MessageType.IMAGE, url, true)));
//        }
        emitter.complete();
    }

    @Override
    public void dispose() {
        log.info("OpenAI返回数据关闭");
        emitter.complete();
    }
}